{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ParameterServerStrategy on Hops\n",
    "---\n",
    "\n",
    "<font color='red'> <h3>Tested with TensorFlow 1.11</h3></font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def distributed_training():\n",
    "\n",
    "    import sys\n",
    "    import numpy as np\n",
    "    import tensorflow as tf\n",
    "    import os\n",
    "    \n",
    "    from hops import tensorboard\n",
    "    from hops import devices\n",
    "\n",
    "    def input_fn():\n",
    "      x = np.random.random((1024, 10))\n",
    "      y = np.random.randint(2, size=(1024, 1))\n",
    "      x = tf.cast(x, tf.float32)\n",
    "      dataset = tf.data.Dataset.from_tensor_slices((x, y))\n",
    "      dataset = dataset.repeat(500)\n",
    "      dataset = dataset.batch(32)\n",
    "      return dataset\n",
    "\n",
    "    model_dir = tensorboard.logdir()\n",
    "    print('Using %s to store checkpoints.' % model_dir)\n",
    "\n",
    "    # Define a Keras Model.\n",
    "    model = tf.keras.Sequential()\n",
    "    model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,)))\n",
    "    model.add(tf.keras.layers.Dense(1, activation='sigmoid'))\n",
    "\n",
    "    # Compile the model.\n",
    "    optimizer = tf.train.GradientDescentOptimizer(0.2)\n",
    "    model.compile(loss='binary_crossentropy', optimizer=optimizer)\n",
    "    model.summary()\n",
    "    tf.keras.backend.set_learning_phase(True)\n",
    "\n",
    "    # Define DistributionStrategies and convert the Keras Model to an\n",
    "    # Estimator that utilizes these DistributionStrateges.\n",
    "    # Evaluator is a single worker, so using MirroredStrategy.\n",
    "    run_config = tf.estimator.RunConfig(\n",
    "        experimental_distribute=tf.contrib.distribute.DistributeConfig(\n",
    "            train_distribute=tf.contrib.distribute.ParameterServerStrategy(\n",
    "                num_gpus_per_worker=devices.get_num_gpus()),\n",
    "            eval_distribute=tf.contrib.distribute.MirroredStrategy(\n",
    "                num_gpus_per_worker=devices.get_num_gpus())))\n",
    "    keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, config=run_config, model_dir=model_dir)\n",
    "\n",
    "    # Train and evaluate the model. Evaluation will be skipped if there is not an\n",
    "    # \"evaluator\" job in the cluster.\n",
    "    tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n",
    "        eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))\n",
    "    \n",
    "from hops import experiment\n",
    "experiment.parameter_server(distributed_training)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
